# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from mozpack.copier import (
    FileCopier,
    FileRegistry,
    FileRegistrySubtree,
    Jarrer,
)
from mozpack.files import (
    GeneratedFile,
    ExistingFile,
)
from mozpack.mozjar import JarReader
import mozpack.path as mozpath
import unittest
import mozunit
import os
import stat
from mozpack.errors import ErrorMessage
from mozpack.test.test_files import (
    MockDest,
    MatchTestTemplate,
    TestWithTmpDir,
)


class BaseTestFileRegistry(MatchTestTemplate):
    def add(self, path):
        self.registry.add(path, GeneratedFile(path))

    def do_check(self, pattern, result):
        self.checked = True
        if result:
            self.assertTrue(self.registry.contains(pattern))
        else:
            self.assertFalse(self.registry.contains(pattern))
        self.assertEqual(self.registry.match(pattern), result)

    def do_test_file_registry(self, registry):
        self.registry = registry
        self.registry.add('foo', GeneratedFile('foo'))
        bar = GeneratedFile('bar')
        self.registry.add('bar', bar)
        self.assertEqual(self.registry.paths(), ['foo', 'bar'])
        self.assertEqual(self.registry['bar'], bar)

        self.assertRaises(ErrorMessage, self.registry.add, 'foo',
                          GeneratedFile('foo2'))

        self.assertRaises(ErrorMessage, self.registry.remove, 'qux')

        self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
                          GeneratedFile('foobar'))
        self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar/baz',
                          GeneratedFile('foobar'))

        self.assertEqual(self.registry.paths(), ['foo', 'bar'])

        self.registry.remove('foo')
        self.assertEqual(self.registry.paths(), ['bar'])
        self.registry.remove('bar')
        self.assertEqual(self.registry.paths(), [])

        self.prepare_match_test()
        self.do_match_test()
        self.assertTrue(self.checked)
        self.assertEqual(self.registry.paths(), [
            'bar',
            'foo/bar',
            'foo/baz',
            'foo/qux/1',
            'foo/qux/bar',
            'foo/qux/2/test',
            'foo/qux/2/test2',
        ])

        self.registry.remove('foo/qux')
        self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz'])

        self.registry.add('foo/qux', GeneratedFile('fooqux'))
        self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz',
                                                 'foo/qux'])
        self.registry.remove('foo/b*')
        self.assertEqual(self.registry.paths(), ['bar', 'foo/qux'])

        self.assertEqual([f for f, c in self.registry], ['bar', 'foo/qux'])
        self.assertEqual(len(self.registry), 2)

        self.add('foo/.foo')
        self.assertTrue(self.registry.contains('foo/.foo'))

    def do_test_registry_paths(self, registry):
        self.registry = registry

        # Can't add a file if it requires a directory in place of a
        # file we also require.
        self.registry.add('foo', GeneratedFile('foo'))
        self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
                          GeneratedFile('foobar'))

        # Can't add a file if we already have a directory there.
        self.registry.add('bar/baz', GeneratedFile('barbaz'))
        self.assertRaises(ErrorMessage, self.registry.add, 'bar',
                          GeneratedFile('bar'))

        # Bump the count of things that require bar/ to 2.
        self.registry.add('bar/zot', GeneratedFile('barzot'))
        self.assertRaises(ErrorMessage, self.registry.add, 'bar',
                          GeneratedFile('bar'))

        # Drop the count of things that require bar/ to 1.
        self.registry.remove('bar/baz')
        self.assertRaises(ErrorMessage, self.registry.add, 'bar',
                          GeneratedFile('bar'))

        # Drop the count of things that require bar/ to 0.
        self.registry.remove('bar/zot')
        self.registry.add('bar/zot', GeneratedFile('barzot'))

class TestFileRegistry(BaseTestFileRegistry, unittest.TestCase):
    def test_partial_paths(self):
        cases = {
            'foo/bar/baz/zot': ['foo/bar/baz', 'foo/bar', 'foo'],
            'foo/bar': ['foo'],
            'bar': [],
        }
        reg = FileRegistry()
        for path, parts in cases.iteritems():
            self.assertEqual(reg._partial_paths(path), parts)

    def test_file_registry(self):
        self.do_test_file_registry(FileRegistry())

    def test_registry_paths(self):
        self.do_test_registry_paths(FileRegistry())

    def test_required_directories(self):
        self.registry = FileRegistry()

        self.registry.add('foo', GeneratedFile('foo'))
        self.assertEqual(self.registry.required_directories(), set())

        self.registry.add('bar/baz', GeneratedFile('barbaz'))
        self.assertEqual(self.registry.required_directories(), {'bar'})

        self.registry.add('bar/zot', GeneratedFile('barzot'))
        self.assertEqual(self.registry.required_directories(), {'bar'})

        self.registry.add('bar/zap/zot', GeneratedFile('barzapzot'))
        self.assertEqual(self.registry.required_directories(), {'bar', 'bar/zap'})

        self.registry.remove('bar/zap/zot')
        self.assertEqual(self.registry.required_directories(), {'bar'})

        self.registry.remove('bar/baz')
        self.assertEqual(self.registry.required_directories(), {'bar'})

        self.registry.remove('bar/zot')
        self.assertEqual(self.registry.required_directories(), set())

        self.registry.add('x/y/z', GeneratedFile('xyz'))
        self.assertEqual(self.registry.required_directories(), {'x', 'x/y'})


class TestFileRegistrySubtree(BaseTestFileRegistry, unittest.TestCase):
    def test_file_registry_subtree_base(self):
        registry = FileRegistry()
        self.assertEqual(registry, FileRegistrySubtree('', registry))
        self.assertNotEqual(registry, FileRegistrySubtree('base', registry))

    def create_registry(self):
        registry = FileRegistry()
        registry.add('foo/bar', GeneratedFile('foo/bar'))
        registry.add('baz/qux', GeneratedFile('baz/qux'))
        return FileRegistrySubtree('base/root', registry)

    def test_file_registry_subtree(self):
        self.do_test_file_registry(self.create_registry())

    def test_registry_paths_subtree(self):
        registry = FileRegistry()
        self.do_test_registry_paths(self.create_registry())


class TestFileCopier(TestWithTmpDir):
    def all_dirs(self, base):
        all_dirs = set()
        for root, dirs, files in os.walk(base):
            if not dirs:
                all_dirs.add(mozpath.relpath(root, base))
        return all_dirs

    def all_files(self, base):
        all_files = set()
        for root, dirs, files in os.walk(base):
            for f in files:
                all_files.add(
                    mozpath.join(mozpath.relpath(root, base), f))
        return all_files

    def test_file_copier(self):
        copier = FileCopier()
        copier.add('foo/bar', GeneratedFile('foobar'))
        copier.add('foo/qux', GeneratedFile('fooqux'))
        copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
        copier.add('bar', GeneratedFile('bar'))
        copier.add('qux/foo', GeneratedFile('quxfoo'))
        copier.add('qux/bar', GeneratedFile(''))

        result = copier.copy(self.tmpdir)
        self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
        self.assertEqual(self.all_dirs(self.tmpdir),
                         set(['foo/deep/nested/directory', 'qux']))

        self.assertEqual(result.updated_files, set(self.tmppath(p) for p in
            self.all_files(self.tmpdir)))
        self.assertEqual(result.existing_files, set())
        self.assertEqual(result.removed_files, set())
        self.assertEqual(result.removed_directories, set())

        copier.remove('foo')
        copier.add('test', GeneratedFile('test'))
        result = copier.copy(self.tmpdir)
        self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
        self.assertEqual(self.all_dirs(self.tmpdir), set(['qux']))
        self.assertEqual(result.removed_files, set(self.tmppath(p) for p in
            ('foo/bar', 'foo/qux', 'foo/deep/nested/directory/file')))

    def test_symlink_directory_replaced(self):
        """Directory symlinks in destination are replaced if they need to be
        real directories."""
        if not self.symlink_supported:
            return

        dest = self.tmppath('dest')

        copier = FileCopier()
        copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))

        os.makedirs(self.tmppath('dest/foo'))
        dummy = self.tmppath('dummy')
        os.mkdir(dummy)
        link = self.tmppath('dest/foo/bar')
        os.symlink(dummy, link)

        result = copier.copy(dest)

        st = os.lstat(link)
        self.assertFalse(stat.S_ISLNK(st.st_mode))
        self.assertTrue(stat.S_ISDIR(st.st_mode))

        self.assertEqual(self.all_files(dest), set(copier.paths()))

        self.assertEqual(result.removed_directories, set())
        self.assertEqual(len(result.updated_files), 1)

    def test_remove_unaccounted_directory_symlinks(self):
        """Directory symlinks in destination that are not in the way are
        deleted according to remove_unaccounted and
        remove_all_directory_symlinks.
        """
        if not self.symlink_supported:
            return

        dest = self.tmppath('dest')

        copier = FileCopier()
        copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))

        os.makedirs(self.tmppath('dest/foo'))
        dummy = self.tmppath('dummy')
        os.mkdir(dummy)

        os.mkdir(self.tmppath('dest/zot'))
        link = self.tmppath('dest/zot/zap')
        os.symlink(dummy, link)

        # If not remove_unaccounted but remove_empty_directories, then
        # the symlinked directory remains (as does its containing
        # directory).
        result = copier.copy(dest, remove_unaccounted=False,
            remove_empty_directories=True,
            remove_all_directory_symlinks=False)

        st = os.lstat(link)
        self.assertTrue(stat.S_ISLNK(st.st_mode))
        self.assertFalse(stat.S_ISDIR(st.st_mode))

        self.assertEqual(self.all_files(dest), set(copier.paths()))
        self.assertEqual(self.all_dirs(dest), set(['foo/bar']))

        self.assertEqual(result.removed_directories, set())
        self.assertEqual(len(result.updated_files), 1)

        # If remove_unaccounted but not remove_empty_directories, then
        # only the symlinked directory is removed.
        result = copier.copy(dest, remove_unaccounted=True,
            remove_empty_directories=False,
            remove_all_directory_symlinks=False)

        st = os.lstat(self.tmppath('dest/zot'))
        self.assertFalse(stat.S_ISLNK(st.st_mode))
        self.assertTrue(stat.S_ISDIR(st.st_mode))

        self.assertEqual(result.removed_files, set([link]))
        self.assertEqual(result.removed_directories, set())

        self.assertEqual(self.all_files(dest), set(copier.paths()))
        self.assertEqual(self.all_dirs(dest), set(['foo/bar', 'zot']))

        # If remove_unaccounted and remove_empty_directories, then
        # both the symlink and its containing directory are removed.
        link = self.tmppath('dest/zot/zap')
        os.symlink(dummy, link)

        result = copier.copy(dest, remove_unaccounted=True,
            remove_empty_directories=True,
            remove_all_directory_symlinks=False)

        self.assertEqual(result.removed_files, set([link]))
        self.assertEqual(result.removed_directories, set([self.tmppath('dest/zot')]))

        self.assertEqual(self.all_files(dest), set(copier.paths()))
        self.assertEqual(self.all_dirs(dest), set(['foo/bar']))

    def test_permissions(self):
        """Ensure files without write permission can be deleted."""
        with open(self.tmppath('dummy'), 'a'):
            pass

        p = self.tmppath('no_perms')
        with open(p, 'a'):
            pass

        # Make file and directory unwritable. Reminder: making a directory
        # unwritable prevents modifications (including deletes) from the list
        # of files in that directory.
        os.chmod(p, 0o400)
        os.chmod(self.tmpdir, 0o400)

        copier = FileCopier()
        copier.add('dummy', GeneratedFile('content'))
        result = copier.copy(self.tmpdir)
        self.assertEqual(result.removed_files_count, 1)
        self.assertFalse(os.path.exists(p))

    def test_no_remove(self):
        copier = FileCopier()
        copier.add('foo', GeneratedFile('foo'))

        with open(self.tmppath('bar'), 'a'):
            pass

        os.mkdir(self.tmppath('emptydir'))
        d = self.tmppath('populateddir')
        os.mkdir(d)

        with open(self.tmppath('populateddir/foo'), 'a'):
            pass

        result = copier.copy(self.tmpdir, remove_unaccounted=False)

        self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
            'populateddir/foo']))
        self.assertEqual(self.all_dirs(self.tmpdir), set(['populateddir']))
        self.assertEqual(result.removed_files, set())
        self.assertEqual(result.removed_directories,
            set([self.tmppath('emptydir')]))

    def test_no_remove_empty_directories(self):
        copier = FileCopier()
        copier.add('foo', GeneratedFile('foo'))

        with open(self.tmppath('bar'), 'a'):
            pass

        os.mkdir(self.tmppath('emptydir'))
        d = self.tmppath('populateddir')
        os.mkdir(d)

        with open(self.tmppath('populateddir/foo'), 'a'):
            pass

        result = copier.copy(self.tmpdir, remove_unaccounted=False,
            remove_empty_directories=False)

        self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
            'populateddir/foo']))
        self.assertEqual(self.all_dirs(self.tmpdir), set(['emptydir',
            'populateddir']))
        self.assertEqual(result.removed_files, set())
        self.assertEqual(result.removed_directories, set())

    def test_optional_exists_creates_unneeded_directory(self):
        """Demonstrate that a directory not strictly required, but specified
        as the path to an optional file, will be unnecessarily created.

        This behaviour is wrong; fixing it is tracked by Bug 972432;
        and this test exists to guard against unexpected changes in
        behaviour.
        """

        dest = self.tmppath('dest')

        copier = FileCopier()
        copier.add('foo/bar', ExistingFile(required=False))

        result = copier.copy(dest)

        st = os.lstat(self.tmppath('dest/foo'))
        self.assertFalse(stat.S_ISLNK(st.st_mode))
        self.assertTrue(stat.S_ISDIR(st.st_mode))

        # What's worse, we have no record that dest was created.
        self.assertEquals(len(result.updated_files), 0)

        # But we do have an erroneous record of an optional file
        # existing when it does not.
        self.assertIn(self.tmppath('dest/foo/bar'), result.existing_files)

    def test_remove_unaccounted_file_registry(self):
        """Test FileCopier.copy(remove_unaccounted=FileRegistry())"""

        dest = self.tmppath('dest')

        copier = FileCopier()
        copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
        copier.add('foo/bar/qux', GeneratedFile('foobarqux'))
        copier.add('foo/hoge/fuga', GeneratedFile('foohogefuga'))
        copier.add('foo/toto/tata', GeneratedFile('footototata'))

        os.makedirs(os.path.join(dest, 'bar'))
        with open(os.path.join(dest, 'bar', 'bar'), 'w') as fh:
            fh.write('barbar');
        os.makedirs(os.path.join(dest, 'foo', 'toto'))
        with open(os.path.join(dest, 'foo', 'toto', 'toto'), 'w') as fh:
            fh.write('foototototo');

        result = copier.copy(dest, remove_unaccounted=False)

        self.assertEqual(self.all_files(dest),
                         set(copier.paths()) | { 'foo/toto/toto', 'bar/bar'})
        self.assertEqual(self.all_dirs(dest),
                         {'foo/bar', 'foo/hoge', 'foo/toto', 'bar'})

        copier2 = FileCopier()
        copier2.add('foo/hoge/fuga', GeneratedFile('foohogefuga'))

        # We expect only files copied from the first copier to be removed,
        # not the extra file that was there beforehand.
        result = copier2.copy(dest, remove_unaccounted=copier)

        self.assertEqual(self.all_files(dest),
                         set(copier2.paths()) | { 'foo/toto/toto', 'bar/bar'})
        self.assertEqual(self.all_dirs(dest),
                         {'foo/hoge', 'foo/toto', 'bar'})
        self.assertEqual(result.updated_files,
                         {self.tmppath('dest/foo/hoge/fuga')})
        self.assertEqual(result.existing_files, set())
        self.assertEqual(result.removed_files, {self.tmppath(p) for p in
            ('dest/foo/bar/baz', 'dest/foo/bar/qux', 'dest/foo/toto/tata')})
        self.assertEqual(result.removed_directories,
                         {self.tmppath('dest/foo/bar')})


class TestJarrer(unittest.TestCase):
    def check_jar(self, dest, copier):
        jar = JarReader(fileobj=dest)
        self.assertEqual([f.filename for f in jar], copier.paths())
        for f in jar:
            self.assertEqual(f.uncompressed_data.read(),
                             copier[f.filename].content)

    def test_jarrer(self):
        copier = Jarrer()
        copier.add('foo/bar', GeneratedFile('foobar'))
        copier.add('foo/qux', GeneratedFile('fooqux'))
        copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
        copier.add('bar', GeneratedFile('bar'))
        copier.add('qux/foo', GeneratedFile('quxfoo'))
        copier.add('qux/bar', GeneratedFile(''))

        dest = MockDest()
        copier.copy(dest)
        self.check_jar(dest, copier)

        copier.remove('foo')
        copier.add('test', GeneratedFile('test'))
        copier.copy(dest)
        self.check_jar(dest, copier)

        copier.remove('test')
        copier.add('test', GeneratedFile('replaced-content'))
        copier.copy(dest)
        self.check_jar(dest, copier)

        copier.copy(dest)
        self.check_jar(dest, copier)

        preloaded = ['qux/bar', 'bar']
        copier.preload(preloaded)
        copier.copy(dest)

        dest.seek(0)
        jar = JarReader(fileobj=dest)
        self.assertEqual([f.filename for f in jar], preloaded +
                         [p for p in copier.paths() if not p in preloaded])
        self.assertEqual(jar.last_preloaded, preloaded[-1])


    def test_jarrer_compress(self):
        copier = Jarrer()
        copier.add('foo/bar', GeneratedFile('ffffff'))
        copier.add('foo/qux', GeneratedFile('ffffff'), compress=False)

        dest = MockDest()
        copier.copy(dest)
        self.check_jar(dest, copier)

        dest.seek(0)
        jar = JarReader(fileobj=dest)
        self.assertTrue(jar['foo/bar'].compressed)
        self.assertFalse(jar['foo/qux'].compressed)


if __name__ == '__main__':
    mozunit.main()
